home *** CD-ROM | disk | FTP | other *** search
/ Mac Easy 2010 May / Mac Life Ubuntu.iso / casper / filesystem.squashfs / usr / share / system-config-printer / troubleshoot / PrintTestPage.pyc (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2009-10-12  |  15.6 KB  |  480 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.6)
  3.  
  4. import cups
  5. import dbus
  6. import dbus.glib as dbus
  7. import gobject
  8. import os
  9. import pango
  10. import tempfile
  11. import time
  12. from timedops import TimedOperation
  13. from base import *
  14. import errordialogs
  15. errordialogs.set_gettext_function(_)
  16. from errordialogs import *
  17. DBUS_PATH = '/com/redhat/PrinterSpooler'
  18. DBUS_IFACE = 'com.redhat.PrinterSpooler'
  19.  
  20. class PrintTestPage(Question):
  21.     STATE = {
  22.         cups.IPP_JOB_PENDING: _('Pending'),
  23.         cups.IPP_JOB_HELD: _('Held'),
  24.         cups.IPP_JOB_PROCESSING: _('Processing'),
  25.         cups.IPP_JOB_STOPPED: _('Stopped'),
  26.         cups.IPP_JOB_CANCELED: _('Canceled'),
  27.         cups.IPP_JOB_ABORTED: _('Aborted'),
  28.         cups.IPP_JOB_COMPLETED: _('Completed') }
  29.     
  30.     def __init__(self, troubleshooter):
  31.         Question.__init__(self, troubleshooter, 'Print test page')
  32.         page = gtk.VBox()
  33.         page.set_spacing(12)
  34.         page.set_border_width(12)
  35.         label = gtk.Label()
  36.         label.set_alignment(0, 0)
  37.         label.set_use_markup(True)
  38.         label.set_line_wrap(True)
  39.         page.pack_start(label, False, False, 0)
  40.         self.main_label = label
  41.         self.main_label_text = '<span weight="bold" size="larger">' + _('Test Page') + '</span>\n\n' + _('Now print a test page.  If you are having problems printing a specific document, print that document now and mark the print job below.')
  42.         hbox = gtk.HButtonBox()
  43.         hbox.set_border_width(0)
  44.         hbox.set_spacing(3)
  45.         hbox.set_layout(gtk.BUTTONBOX_START)
  46.         self.print_button = gtk.Button(_('Print Test Page'))
  47.         hbox.pack_start(self.print_button, False, False, 0)
  48.         self.cancel_button = gtk.Button(_('Cancel All Jobs'))
  49.         hbox.pack_start(self.cancel_button, False, False, 0)
  50.         page.pack_start(hbox, False, False, 0)
  51.         tv = gtk.TreeView()
  52.         test_cell = gtk.CellRendererToggle()
  53.         test = gtk.TreeViewColumn(_('Test'), test_cell, active = 0)
  54.         job = gtk.TreeViewColumn(_('Job'), gtk.CellRendererText(), text = 1)
  55.         printer_cell = gtk.CellRendererText()
  56.         printer = gtk.TreeViewColumn(_('Printer'), printer_cell, text = 2)
  57.         name_cell = gtk.CellRendererText()
  58.         name = gtk.TreeViewColumn(_('Document'), name_cell, text = 3)
  59.         status = gtk.TreeViewColumn(_('Status'), gtk.CellRendererText(), text = 4)
  60.         test_cell.set_radio(False)
  61.         self.test_cell = test_cell
  62.         printer.set_resizable(True)
  63.         printer_cell.set_property('ellipsize', pango.ELLIPSIZE_END)
  64.         printer_cell.set_property('width-chars', 20)
  65.         name.set_resizable(True)
  66.         name_cell.set_property('ellipsize', pango.ELLIPSIZE_END)
  67.         name_cell.set_property('width-chars', 20)
  68.         status.set_resizable(True)
  69.         tv.append_column(test)
  70.         tv.append_column(job)
  71.         tv.append_column(printer)
  72.         tv.append_column(name)
  73.         tv.append_column(status)
  74.         tv.set_rules_hint(True)
  75.         sw = gtk.ScrolledWindow()
  76.         sw.set_policy(gtk.POLICY_AUTOMATIC, gtk.POLICY_AUTOMATIC)
  77.         sw.set_shadow_type(gtk.SHADOW_IN)
  78.         sw.add(tv)
  79.         self.treeview = tv
  80.         page.pack_start(sw)
  81.         label = gtk.Label(_('Did the marked print jobs print correctly?'))
  82.         label.set_line_wrap(True)
  83.         label.set_alignment(0, 0)
  84.         page.pack_start(label, False, False, 0)
  85.         vbox = gtk.VBox()
  86.         vbox.set_spacing(6)
  87.         self.yes = gtk.RadioButton(label = _('Yes'))
  88.         no = gtk.RadioButton(label = _('No'))
  89.         no.set_group(self.yes)
  90.         vbox.pack_start(self.yes, False, False, 0)
  91.         vbox.pack_start(no, False, False, 0)
  92.         page.pack_start(vbox, False, False, 0)
  93.         self.persistent_answers = { }
  94.         troubleshooter.new_page(page, self)
  95.  
  96.     
  97.     def display(self):
  98.         answers = self.troubleshooter.answers
  99.         if not answers.has_key('cups_queue'):
  100.             return False
  101.         parent = self.troubleshooter.get_window()
  102.         self.authconn = answers['_authenticated_connection']
  103.         mediatype = None
  104.         defaults = answers.get('cups_printer_ppd_defaults', { })
  105.         for opts in defaults.values():
  106.             for opt, value in opts.iteritems():
  107.                 if opt == 'MediaType':
  108.                     mediatype = value
  109.                     break
  110.                     continue
  111.                 answers.has_key('cups_queue')
  112.             
  113.         
  114.         if mediatype != None:
  115.             mediatype_string = '\n\n' + _("Remember to load paper of type '%s' into the printer first.") % mediatype
  116.         else:
  117.             mediatype_string = ''
  118.         label_text = self.main_label_text + mediatype_string
  119.         self.main_label.set_markup(label_text)
  120.         model = gtk.ListStore(gobject.TYPE_BOOLEAN, gobject.TYPE_INT, gobject.TYPE_STRING, gobject.TYPE_STRING, gobject.TYPE_STRING)
  121.         self.treeview.set_model(model)
  122.         self.job_to_iter = { }
  123.         test_jobs = self.persistent_answers.get('test_page_job_id', [])
  124.         
  125.         def get_jobs():
  126.             c = self.authconn
  127.             jobs_dict = c.getJobs(which_jobs = 'not-completed', my_jobs = False)
  128.             completed_jobs_dict = c.getJobs(which_jobs = 'completed')
  129.             return (jobs_dict, completed_jobs_dict)
  130.  
  131.         self.op = TimedOperation(get_jobs, parent = parent)
  132.         (jobs_dict, completed_jobs_dict) = self.op.run()
  133.         
  134.         try:
  135.             queue_uri_ending = '/' + self.troubleshooter.answers['cups_queue']
  136.             jobs_on_this_printer = ((None,), filter)((lambda x: jobs_dict[x]['job-printer-uri'].endswith(queue_uri_ending)), jobs_dict.keys())
  137.         except:
  138.             (None,)
  139.             jobs_on_this_printer = []
  140.  
  141.         jobs = list(set(test_jobs).union(set(jobs_on_this_printer)))
  142.         completed_jobs_dict = None
  143.         for job in jobs:
  144.             
  145.             try:
  146.                 j = jobs_dict[job]
  147.             except KeyError:
  148.                 (None,)
  149.                 (None,)
  150.                 
  151.                 try:
  152.                     j = completed_jobs_dict[job]
  153.                 except KeyError:
  154.                     continue
  155.                 except:
  156.                     None<EXCEPTION MATCH>KeyError
  157.                 
  158.  
  159.                 None<EXCEPTION MATCH>KeyError
  160.  
  161.             iter = model.append(None)
  162.             self.job_to_iter[job] = iter
  163.             model.set_value(iter, 0, job in test_jobs)
  164.             model.set_value(iter, 1, job)
  165.             self.update_job(job, j)
  166.         
  167.         return True
  168.  
  169.     
  170.     def connect_signals(self, handler):
  171.         self.print_sigid = self.print_button.connect('clicked', self.print_clicked)
  172.         self.cancel_sigid = self.cancel_button.connect('clicked', self.cancel_clicked)
  173.         self.test_sigid = self.test_cell.connect('toggled', self.test_toggled)
  174.         
  175.         def create_subscription():
  176.             c = self.authconn
  177.             sub_id = c.createSubscription('/', events = [
  178.                 'job-created',
  179.                 'job-completed',
  180.                 'job-stopped',
  181.                 'job-progress',
  182.                 'job-state-changed'])
  183.             return sub_id
  184.  
  185.         parent = self.troubleshooter.get_window()
  186.         self.op = TimedOperation(create_subscription, parent = parent)
  187.         self.sub_id = self.op.run()
  188.         
  189.         try:
  190.             bus = dbus.SystemBus()
  191.         except:
  192.             (None,)
  193.             bus = None
  194.  
  195.         self.bus = bus
  196.         if bus:
  197.             bus.add_signal_receiver(self.handle_dbus_signal, path = DBUS_PATH, dbus_interface = DBUS_IFACE)
  198.         
  199.         self.timer = gobject.timeout_add(1000, self.update_jobs_list)
  200.  
  201.     
  202.     def disconnect_signals(self):
  203.         if self.bus:
  204.             self.bus.remove_signal_receiver(self.handle_dbus_signal, path = DBUS_PATH, dbus_interface = DBUS_IFACE)
  205.         
  206.         self.print_button.disconnect(self.print_sigid)
  207.         self.cancel_button.disconnect(self.cancel_sigid)
  208.         self.test_cell.disconnect(self.test_sigid)
  209.         
  210.         def cancel_subscription(sub_id):
  211.             c = self.authconn
  212.             c.cancelSubscription(sub_id)
  213.  
  214.         parent = self.troubleshooter.get_window()
  215.         self.op = TimedOperation(cancel_subscription, (self.sub_id,), parent = parent)
  216.         self.op.run()
  217.         
  218.         try:
  219.             del self.sub_seq
  220.         except:
  221.             (None,)
  222.  
  223.         gobject.source_remove(self.timer)
  224.  
  225.     
  226.     def collect_answer(self):
  227.         if not self.displayed:
  228.             return { }
  229.         self.answers = self.persistent_answers.copy()
  230.         parent = self.troubleshooter.get_window()
  231.         success = self.yes.get_active()
  232.         self.answers['test_page_successful'] = success
  233.         
  234.         class collect_jobs:
  235.             
  236.             def __init__(self, model):
  237.                 self.jobs = []
  238.                 model.foreach(self.each, None)
  239.  
  240.             
  241.             def each(self, model, path, iter, user_data):
  242.                 self.jobs.append(model.get(iter, 0, 1, 2, 3, 4))
  243.  
  244.  
  245.         model = self.treeview.get_model()
  246.         jobs = collect_jobs(model).jobs
  247.         
  248.         def collect_attributes(jobs):
  249.             job_attrs = None
  250.             c = self.authconn
  251.             with_attrs = []
  252.             for test, jobid, printer, doc, status in jobs:
  253.                 attrs = None
  254.                 if test:
  255.                     
  256.                     try:
  257.                         attrs = c.getJobAttributes(jobid)
  258.                     except AttributeError:
  259.                         if job_attrs == None:
  260.                             job_attrs = c.getJobs(which_jobs = 'all')
  261.                         
  262.                         attrs = self.job_attrs[jobid]
  263.                     except:
  264.                         None<EXCEPTION MATCH>AttributeError
  265.                     
  266.  
  267.                 None<EXCEPTION MATCH>AttributeError
  268.                 with_attrs.append((test, jobid, printer, doc, status, attrs))
  269.             
  270.             return with_attrs
  271.  
  272.         self.op = TimedOperation(collect_attributes, (jobs,), parent = parent)
  273.         with_attrs = self.op.run()
  274.         self.answers['test_page_job_status'] = with_attrs
  275.         return self.answers
  276.  
  277.     
  278.     def cancel_operation(self):
  279.         self.op.cancel()
  280.         answers = self.troubleshooter.answers
  281.         factory = answers['_authenticated_connection_factory']
  282.         self.authconn = factory.get_connection()
  283.         self.answers['_authenticated_connection'] = self.authconn
  284.  
  285.     
  286.     def handle_dbus_signal(self, *args):
  287.         debugprint('D-Bus signal caught: updating jobs list soon')
  288.         gobject.source_remove(self.timer)
  289.         self.timer = gobject.timeout_add(200, self.update_jobs_list)
  290.  
  291.     
  292.     def update_job(self, jobid, job_dict):
  293.         iter = self.job_to_iter[jobid]
  294.         model = self.treeview.get_model()
  295.         
  296.         try:
  297.             printer_name = job_dict['printer-name']
  298.         except KeyError:
  299.             
  300.             try:
  301.                 uri = job_dict['job-printer-uri']
  302.                 r = uri.rfind('/')
  303.                 printer_name = uri[r + 1:]
  304.             except KeyError:
  305.                 printer_name = None
  306.             except:
  307.                 None<EXCEPTION MATCH>KeyError
  308.             
  309.  
  310.             None<EXCEPTION MATCH>KeyError
  311.  
  312.         if printer_name != None:
  313.             model.set_value(iter, 2, printer_name)
  314.         
  315.         model.set_value(iter, 3, job_dict['job-name'])
  316.         model.set_value(iter, 4, self.STATE[job_dict['job-state']])
  317.  
  318.     
  319.     def print_clicked(self, widget):
  320.         now = time.time()
  321.         tt = time.localtime(now)
  322.         when = time.strftime('%d/%b/%Y:%T %z', tt)
  323.         self.persistent_answers['test_page_attempted'] = when
  324.         answers = self.troubleshooter.answers
  325.         parent = self.troubleshooter.get_window()
  326.         
  327.         def print_test_page(*args, **kwargs):
  328.             factory = answers['_authenticated_connection_factory']
  329.             c = factory.get_connection()
  330.             return c.printTestPage(*args, **kwargs)
  331.  
  332.         tmpfname = None
  333.         mimetypes = [
  334.             None,
  335.             'text/plain']
  336.         for mimetype in mimetypes:
  337.             
  338.             try:
  339.                 if mimetype == None:
  340.                     self.op = TimedOperation(print_test_page, (answers['cups_queue'],), parent = parent)
  341.                     jobid = self.op.run()
  342.                 elif mimetype == 'text/plain':
  343.                     (tmpfd, tmpfname) = tempfile.mkstemp()
  344.                     os.write(tmpfd, 'This is a test page.\n')
  345.                     os.close(tmpfd)
  346.                     self.op = TimedOperation(print_test_page, (answers['cups_queue'],), kwargs = {
  347.                         'file': tmpfname,
  348.                         'format': mimetype }, parent = parent)
  349.                     jobid = self.op.run()
  350.                     
  351.                     try:
  352.                         os.unlink(tmpfname)
  353.                     except OSError:
  354.                         pass
  355.  
  356.                     tmpfname = None
  357.                 
  358.                 jobs = self.persistent_answers.get('test_page_job_id', [])
  359.                 jobs.append(jobid)
  360.                 self.persistent_answers['test_page_job_id'] = jobs
  361.             continue
  362.             except RuntimeError:
  363.                 (None,)
  364.                 (None,)
  365.                 self.persistent_answers['test_page_submit_failure'] = 'connect'
  366.                 break
  367.                 continue
  368.                 except cups.IPPError:
  369.                     (e, s) = None
  370.                     if e == cups.IPP_DOCUMENT_FORMAT and mimetypes.index(mimetype) < len(mimetypes) - 1:
  371.                         if tmpfname != None:
  372.                             os.unlink(tmpfname)
  373.                             tmpfname = None
  374.                             continue
  375.                         continue
  376.                     
  377.                     self.persistent_answers['test_page_submit_failure'] = (e, s)
  378.                     show_error_dialog(_('Error submitting test page'), _("There was an error during the CUPS operation: '%s'.") % s, self.troubleshooter.get_window())
  379.                     break
  380.                     continue
  381.                 
  382.             return None
  383.  
  384.  
  385.     
  386.     def cancel_clicked(self, widget):
  387.         self.persistent_answers['test_page_jobs_cancelled'] = True
  388.         jobids = []
  389.         for jobid, iter in self.job_to_iter.iteritems():
  390.             jobids.append(jobid)
  391.         
  392.         
  393.         def cancel_jobs(jobids):
  394.             c = self.authconn
  395.             for jobid in jobids:
  396.                 
  397.                 try:
  398.                     c.cancelJob(jobid)
  399.                 continue
  400.                 except cups.IPPError:
  401.                     (e, s) = None
  402.                     if e != cups.IPP_NOT_POSSIBLE:
  403.                         self.persistent_answers['test_page_cancel_failure'] = (e, s)
  404.                     
  405.                     e != cups.IPP_NOT_POSSIBLE
  406.                 
  407.  
  408.             
  409.  
  410.         self.op = TimedOperation(cancel_jobs, (jobids,), parent = self.troubleshooter.get_window())
  411.         self.op.run()
  412.  
  413.     
  414.     def test_toggled(self, cell, path):
  415.         model = self.treeview.get_model()
  416.         iter = model.get_iter(path)
  417.         active = model.get_value(iter, 0)
  418.         model.set_value(iter, 0, not active)
  419.  
  420.     
  421.     def update_jobs_list(self):
  422.         
  423.         def get_notifications(self):
  424.             c = self.authconn
  425.             
  426.             try:
  427.                 notifications = c.getNotifications([
  428.                     self.sub_id], [
  429.                     self.sub_seq + 1])
  430.             except AttributeError:
  431.                 notifications = c.getNotifications([
  432.                     self.sub_id])
  433.  
  434.             return notifications
  435.  
  436.         gtk.gdk.threads_enter()
  437.         parent = self.troubleshooter.get_window()
  438.         self.op = TimedOperation(get_notifications, (self,), parent = parent)
  439.         notifications = self.op.run()
  440.         answers = self.troubleshooter.answers
  441.         model = self.treeview.get_model()
  442.         queue = answers['cups_queue']
  443.         test_jobs = self.persistent_answers.get('test_page_job_id', [])
  444.         for event in notifications['events']:
  445.             seq = event['notify-sequence-number']
  446.             
  447.             try:
  448.                 if seq <= self.sub_seq:
  449.                     continue
  450.             except AttributeError:
  451.                 pass
  452.  
  453.             self.sub_seq = seq
  454.             job = event['notify-job-id']
  455.             nse = event['notify-subscribed-event']
  456.             if nse == 'job-created':
  457.                 if job in test_jobs or event['printer-name'] == queue:
  458.                     iter = model.append(None)
  459.                     self.job_to_iter[job] = iter
  460.                     model.set_value(iter, 0, True)
  461.                     model.set_value(iter, 1, job)
  462.                 
  463.             elif not self.job_to_iter.has_key(job):
  464.                 continue
  465.             
  466.             if job in test_jobs and nse in ('job-stopped', 'job-completed'):
  467.                 comp = self.persistent_answers.get('test_page_completions', [])
  468.                 comp.append((job, event['notify-text']))
  469.                 self.persistent_answers['test_page_completions'] = comp
  470.             
  471.             self.update_job(job, event)
  472.         
  473.         gobject.source_remove(self.timer)
  474.         self.timer = gobject.timeout_add(1000 * notifications['notify-get-interval'], self.update_jobs_list)
  475.         debugprint('Update again in %ds' % notifications['notify-get-interval'])
  476.         gtk.gdk.threads_leave()
  477.         return False
  478.  
  479.  
  480.